package com.xiaofan.java;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCount_A0001 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> text = env.readTextFile("hdfs://cluster/data");

        SingleOutputStreamOperator<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy((KeySelector<Tuple2<String, Integer>, String>) value -> value.f0).sum(1);

        counts.writeAsText("hdfs://cluster/result");

        env.execute("WordCount_A0001");

    }

    /**
     * \W+匹配一个或多个非字母进行切割
     */
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {


        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] tokens = value.toLowerCase().split("\\W+");

            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }

        }
    }
}
